#include "config.h"

#include <sys/types.h>
#include <sys/stat.h>
#include <rpc/rpc.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <stdint.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <dirent.h>

#define DBG_SUBSYS S_LIBINTERFACE

#include "ynfs_conf.h"
#include "attr.h"
#include "error.h"
#include "fh.h"
#include "job_dock.h"
#include "nfs_job_context.h"
#include "net_global.h"
#include "nfs_conf.h"
#include "stat_cache.h"
#include "nfs_events.h"
#include "../storage/storage/vnode.h"
#include "nfs_state_machine.h"
#include "readdir.h"
#include "core.h"
#include "sunrpc_proto.h"
#include "sunrpc_reply.h"
#include "xdr_nfs.h"
#include "configure.h"
#include "sysy_conf.h"
#include "lichstor.h"
#include "dbg.h"

/* write verifier */
char wverf[NFS3_WRITEVERFSIZE];
uint32_t nfs_read_max_size = NFS_TCPDATA_MAX;
extern worker_handler_t jobtracker;

typedef enum {
        ACCEPT_STATE_OK, /* 0 */
        ACCEPT_STATE_ERROR, /* 1 */
} acceptstate_t;

/* generate write verifier based on PID and current time */
void regenerate_write_verifier(void)
{
        *(wverf + 0) = (uint32_t) getpid();
        *(wverf + 0) ^= rand();
        *(wverf + 4) = (uint32_t) gettime();
}

static void __nfs_time_trans(const set_time *set_time, __set_time *_time, int init)
{
        if (set_time->set_it == SET_TO_SERVER_TIME || init) {
                DINFO("set server time\n");
                _time->time.seconds = gettime();
                _time->set_it = 1;
        } else if (set_time->set_it == SET_TO_CLIENT_TIME) {
                DINFO("set client time %u\n", _time->time.seconds);
                _time->time.seconds = set_time->time.seconds;
                _time->time.nseconds = set_time->time.nseconds;
                _time->set_it = 1;
        } else {
                DINFO("no change time\n");
                _time->set_it = 0;
        }
}

static void __nfs_attr_trans(const sattr *attr, setattr_t *setattr, int init)
{
        memset((void *)setattr, 0x0, sizeof(*setattr));

        if (attr->mode.set_it) {
                setattr->mode.val = attr->mode.mode;
                setattr->mode.set_it = 1;               
        }

        if (attr->uid.set_it) {
                setattr->uid.val = attr->uid.uid;
                setattr->uid.set_it = 1;               
        }
 
        if (attr->gid.set_it) {
                setattr->gid.val = attr->gid.gid;
                setattr->gid.set_it = 1;               
        }

#if 1
        if (attr->size.set_it) {
                setattr->size.size = attr->size.size; 
                setattr->size.set_it = 1;               
        }
#endif 

        __nfs_time_trans(&attr->atime, &setattr->ctime, init);
        __nfs_time_trans(&attr->atime, &setattr->atime, init);
        __nfs_time_trans(&attr->mtime, &setattr->mtime, init);

        if (init) {
                __nfs_time_trans(&attr->atime, &setattr->btime, init);
        }
}

void nfs3_null_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        (void) nfs_arg;
        (void) _buf;
        DINFO("null\n");
        sunrpc_reply_send(sockid, xid, NULL, NULL, ACCEPT_STATE_OK);
}

void nfs3_getattr_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret, retry = 0;
        post_op_attr attr;
        getattr_args *args;
        getattr_ret res;
        fileid_t *fileid;
        struct stat stbuf;

        args = &nfs_arg->getattr_arg;
        fileid = (fileid_t *)args->obj.val;

        DINFO("getattr "CHKID_FORMAT"\n", CHKID_ARG(fileid));

retry:
        ret = stor_getattr("default", fileid, &stbuf);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        if (retry > MAX_RETRY) {
                                GOTO(err_ret, ret);
                        }

                        DWARN("fileid "CHKID_FORMAT" mode %o\n",
                              CHKID_ARG(fileid), attr.attr.mode);
                        retry++;
                        schedule_sleep("nfs",5 * 1000 * 1000);
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        attr.attr_follow = TRUE;
        get_postopattr_stat(&attr, &stbuf);

        DBUG("fileid "CHKID_FORMAT" mode %o size %llu isdir %u\n", CHKID_ARG(fileid),
             attr.attr.mode, (LLU)stbuf.st_size, S_ISDIR(stbuf.st_mode));

        res.attr = attr.attr;
        res.status = NFS3_OK;
        FREE_ARGS(_buf, getattr);
        sunrpc_reply_send(sockid, xid, (void *)xdr_getattrret, &res, ACCEPT_STATE_OK);

        return;
err_ret:
        DINFO("nfs getattr "CHKID_FORMAT" fail ret:%d\n", CHKID_ARG(fileid), ret);
        FREE_ARGS(_buf, getattr);
        res.status = NFS3_EIO;
        sunrpc_reply_send(sockid, xid, (void *)xdr_getattrret, &res, ACCEPT_STATE_OK);
        return;
}

void nfs3_setattr_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret, retry = 0;
        setattr3_res res;
        preop_attr pre;
        post_op_attr attr;
        setattr3_args *args;
        fileid_t *fileid;
        sattr *attr1;
        setattr_t setattr;

        (void) retry;

        args = &nfs_arg->setattr3_arg;
        fileid = (fileid_t *)args->obj.val;

        DINFO("setattr "CHKID_FORMAT"\n", CHKID_ARG(fileid));

        get_preopattr1(fileid, &pre);

        res.setattr3_res_u.resok.obj_wcc.before = pre;

        res.status = NFS3_OK;

        attr1 = &args->new_attributes;

        DBUG("set attr "CHKID_FORMAT" uid %u gid %u mode %u size %u mtime %u, atime %u\n",
             CHKID_ARG(fileid), attr1->uid.set_it, attr1->gid.set_it,
             attr1->mode.set_it, attr1->size.set_it,
             attr1->mtime.set_it, attr1->atime.set_it);

        if (args->guard.check) {
                DWARN("check!!!!!!!!!!!!!!!!!!!!!\n");
        }

        __nfs_attr_trans(&args->new_attributes, &setattr, 0);
        ret = vnode_setattr("default", fileid, &setattr);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (setattr.size.set_it) {
                ret = vnode_update("default", fileid, setattr.size.size, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        get_postopattr1(fileid, &attr);

        DBUG("increase %d\n", (int)(attr.attr.size - pre.attr.size));

        res.setattr3_res_u.resok.obj_wcc.after = attr;
        sunrpc_reply_send(sockid, xid, (void *)xdr_setattrret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, setattr);

        return;
err_ret:
        DERROR("nfs setattr "CHKID_FORMAT" uid %u gid %u mode %u size %u mtime %u, atime %u fail ret:%d\n",
             CHKID_ARG(fileid), attr1->uid.set_it, attr1->gid.set_it,
             attr1->mode.set_it, attr1->size.set_it,
             attr1->mtime.set_it, attr1->atime.set_it, ret);
        res.status = setattr_err(ret);
        res.setattr3_res_u.resfail.obj_wcc.after.attr_follow = FALSE;
        res.setattr3_res_u.resfail.obj_wcc.before.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_setattrret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, setattr);
        return;
}

void nfs3_lookup_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret, retry = 0;
        fileid_t *parent, fileid;
        lookup_args *args;
        lookup_ret res;

        (void) retry;

        args = &nfs_arg->lookup_arg;
        parent = (fileid_t *)args->dir.val;

        DINFO("parent "CHKID_FORMAT" name %s\n", CHKID_ARG(parent), args->name);

        if (strlen(args->name) > MAX_NAME_LEN) {
                ret = ENAMETOOLONG;
                GOTO(err_ret, ret);
        }

        if (_strcmp(args->name, ".") == 0 || _strcmp(args->name, "./") == 0) {
                fileid = *parent;
                ret = 0;
                res.status = NFS3_OK;
        } else {
                ret = stor_lookup("default", parent, args->name, &fileid);
                if (unlikely(ret)) {
                        DBUG("parent "CHKID_FORMAT" name %s not found\n",
                             CHKID_ARG(parent), args->name);
                        GOTO(err_ret, ret);
                }

                res.status = NFS3_OK;
        }

        DBUG("XXX parent "CHKID_FORMAT" name %s id "CHKID_FORMAT"\n",
             CHKID_ARG(parent), args->name, CHKID_ARG(&fileid));

        res.u.ok.obj.len = sizeof(fileid_t);
        res.u.ok.obj.val = (char *)&fileid;
        get_postopattr1(&fileid, &res.u.ok.obj_attr);
        get_postopattr1(parent, &res.u.ok.dir_attr);

        sunrpc_reply_send(sockid, xid, (void *)xdr_lookupret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, lookup);

        return;
err_ret:
        DERROR("nfs lookup "CHKID_FORMAT"/%s fail ret:%d\n", CHKID_ARG(parent), args->name, ret);
        res.status = lookup_err(ret);
        res.u.fail.dir_attr.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_lookupret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, lookup);
        return;
}

void nfs3_access_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        post_op_attr post;
        mode_t mode __attribute__((unused));
        int access = 0;
        access_args *args;
        access_ret res;
        fileid_t *fileid;

        args = &nfs_arg->access_arg;
        fileid = (fileid_t *)args->obj.val;

        DINFO("access "CHKID_FORMAT"\n", CHKID_ARG(fileid));

        get_postopattr1(fileid, &post);

        mode = post.attr.mode;

        /* owner, group, other, and root are allowed everything */
        access |= ACCESS_READ | ACCESS_MODIFY | ACCESS_EXTEND | ACCESS_EXECUTE;

        /* adjust if directory */
        if (post.attr.type == NFS3_DIR) {
                if (access & ACCESS_READ)
                        access |= ACCESS_LOOKUP;
                if (access & ACCESS_MODIFY)
                        access |= ACCESS_DELETE;
                access &= ~ACCESS_EXECUTE;
        }

        res.status = NFS3_OK;
        res.u.ok.access = access & args->access;
        res.u.ok.obj_attr = post;
        sunrpc_reply_send(sockid, xid, (void *)xdr_accessret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, access);

        return;
}

STATIC int __stor_read__(va_list ap)
{
        int ret;
        const volid_t *volid = va_arg(ap, volid_t *);
        buffer_t *buf = va_arg(ap, buffer_t *);
        size_t size = va_arg(ap, size_t);
        off_t offset = va_arg(ap, off_t);

        va_end(ap);

        ret = stor_read("default", volid, buf, size, offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        return 0;
err_ret:
        return ret;
}

STATIC int __stor_read(const volid_t *volid, buffer_t *buf, size_t size, off_t offset)
{
        int ret, retry = 0;

retry:
        ret = core_request(core_hash(volid), -1, "nfs3_read", __stor_read__,
                           volid, buf, size, offset);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 100, (1000 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

void nfs3_read_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret, retry = 0, eof;
        read_args *args;
        struct stat stbuf;
        read_ret res;
        fileid_t *fileid;
        buffer_t buf;

        args = &nfs_arg->read_arg;
        fileid = (fileid_t *)args->file.val;

        DBUG("read "CHKID_FORMAT" (%llu, %d)\n",
                        CHKID_ARG(fileid), (LLU)args->offset, args->count);

retry:
        ret = stor_getattr("default", fileid, &stbuf);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 100, (1000 * 1000));
                } else {
                        GOTO(err_ret, ret);
                }
        }

        if (args->offset >= (LLU)stbuf.st_size) {
                DWARN("read after offset off %llu size %llu fileid "CHKID_FORMAT"\n",
                      (LLU)args->offset,
                      (LLU)stbuf.st_size, CHKID_ARG(fileid));

                eof = 1;
                ret = 0;
                mbuffer_init(&buf, 0);

                goto read_zero;
        }

        if (args->count + args->offset > (LLU)stbuf.st_size) {
                DBUG("read offset %llu count %u, file len %llu\n", (LLU)args->offset,
                     args->count, (LLU)stbuf.st_size);
                args->count = stbuf.st_size - args->offset;
                eof = 1;
        } else
                eof = 0;

        if (args->count > nfs_read_max_size) {
                DWARN("request too big %u\n", args->count);
                args->count = nfs_read_max_size;
        }

        mbuffer_init(&buf, 0);

        ret = __stor_read(fileid, &buf, args->count, args->offset);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        YASSERT(args->count == buf.len);

read_zero:
        res.status = NFS3_OK;
        res.u.ok.count = buf.len;
        res.u.ok.data.len = buf.len;
        res.u.ok.eof = eof;
        res.u.ok.data.val = (void *)&buf;
        res.status = NFS3_OK;

        /* overlaps with resfail */
        get_postopattr1(fileid, &res.u.ok.attr);

        FREE_ARGS(_buf, read);
        sunrpc_reply_send(sockid, xid, (void *)xdr_readret, &res, ACCEPT_STATE_OK);

        return;
err_ret:
        DERROR("nfs read "CHKID_FORMAT" (%llu, %u) fail ret:%d\n",
                        CHKID_ARG(fileid), (LLU)args->offset, args->count, ret);
        res.status = read_err(ret);
        res.u.fail.attr.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_readret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, read);
        return;
}

void nfs3_rename_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret;
        rename_args *args;
        char *from, *to;
        rename_ret res;
        fileid_t *fromdir, *todir, from_fileid, to_fileid;
        struct stat stfrom, stto;

        args = &nfs_arg->rename_arg;
        from = args->from.name;
        to = args->to.name;

        fromdir = (fileid_t *)args->from.dir.val;
        todir = (fileid_t *)args->to.dir.val;

        DINFO(" from "CHKID_FORMAT" %s to "CHKID_FORMAT" %s\n",
              CHKID_ARG(fromdir), from,
              CHKID_ARG(todir), to);

        get_preopattr1(fromdir, &res.u.ok.from.before);
        get_preopattr1(todir, &res.u.ok.to.before);

retry:
        ret = stor_rename("default", fromdir, from, todir, to);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        ret = stor_lookup("default", fromdir, from, &from_fileid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                        ret = stor_getattr("default", &from_fileid, &stfrom);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = stor_lookup("default", todir, to, &to_fileid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                        ret = stor_getattr("default", &to_fileid, &stto);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        if (S_ISREG(stfrom.st_mode) && S_ISREG(stto.st_mode)) {
#if 1
                                ret = stor_rmvol(todir, to, 0);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                goto retry;
#endif
                        }

                        ret = EEXIST;
                        GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

        res.status = NFS3_OK;

        get_postopattr1(fromdir, &res.u.ok.from.after);
        get_postopattr1(todir, &res.u.ok.to.after);

        FREE_ARGS(_buf, rename);

        sunrpc_reply_send(sockid, xid, (void *)xdr_renameret, &res, ACCEPT_STATE_OK);

        return;
err_ret:
        DERROR("nfs rename from "CHKID_FORMAT"/%s to "CHKID_FORMAT"/%s fail ret:%d\n",
              CHKID_ARG(fromdir), from,
              CHKID_ARG(todir), to, ret);
        res.status = rename_err(ret);
        res.u.fail.from.after.attr_follow = FALSE;
        res.u.fail.to.after.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_renameret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, rename);
        return;
}

static inline void __check(buffer_t *buf, uint64_t offset, uint32_t size)
{
        int ret;
        uint64_t pos = (12 * 64 + 49) * 1048576 + 4096 * 145 + 265;
        uint8_t *ptr;

        if (offset <= pos && offset + size >= pos) {
                ret = ymalloc((void **)&ptr, 524288 * 2);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                mbuffer_get(buf, ptr, buf->len);

                DWARN("area (%llu,%llu) len %u %u, pos %llx is %x !!!!!!!!!!!!!!!!!!\n", (LLU)offset,
                      (LLU)offset + size,
                      size, buf->len, (LLU)pos, ptr[pos - (offset)]);

                yfree((void **)&ptr);
        }
}

STATIC int __stor_write__(va_list ap)
{
        int ret;
        const volid_t *volid = va_arg(ap, volid_t *);
        const buffer_t *buf = va_arg(ap, buffer_t *);
        size_t size = va_arg(ap, size_t);
        off_t offset = va_arg(ap, off_t);

        va_end(ap);

        ret = stor_write("default", volid, buf, size, offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);
 
        ret = vnode_update("default", volid, offset + size, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        return 0;
err_ret:
        return ret;
}

STATIC int __stor_write(const volid_t *volid, const buffer_t *buf, size_t size, off_t offset)
{
        int ret, retry = 0;

retry:
        ret = core_request(core_hash(volid), -1, "nfs3_write", __stor_write__,
                           volid, buf, size, offset);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 100, (1000 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

void nfs3_write_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret;
        write_args *args;
        write_ret res;
        fileid_t *fileid;
        buffer_t *buf;
        preop_attr attr;

        args = &nfs_arg->write_arg;
        fileid = (fileid_t *)args->file.val;

        DBUG("write "CHKID_FORMAT" off %llu size %u\n",
             CHKID_ARG(fileid), (LLU)args->offset, args->data.len);

        buf = (void *)args->data.val;

        get_preopattr1(fileid, &attr);

        if (args->data.len == 0) {
                DWARN("write "CHKID_FORMAT" off %llu size %u\n",
                      CHKID_ARG(fileid), (LLU)args->offset, args->data.len);
                goto zero_write;
        }

        ret = __stor_write(fileid, buf, args->data.len, args->offset);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

zero_write:
        args->stable = FILE_SYNC;

        buf = (void *)args->data.val;
        mbuffer_free(buf);

        //YASSERT(ret == (int)args->data.len);

        res.status = NFS3_OK;
        res.u.ok.count = args->data.len;
        res.u.ok.committed = args->stable;
        _memcpy(res.u.ok.verf, wverf, NFS3_WRITEVERFSIZE);

        DBUG("write %u\n", res.u.ok.count);

        res.u.ok.file_wcc.before = attr;
        get_postopattr1(fileid, &res.u.ok.file_wcc.after);

        FREE_ARGS(_buf, write);
        sunrpc_reply_send(sockid, xid, (void *)xdr_writeret, &res, ACCEPT_STATE_OK);

        return;
err_ret:
        DERROR("nfs write "CHKID_FORMAT" (%llu, %u) fail ret:%d\n",
             CHKID_ARG(fileid), (LLU)args->offset, args->data.len, ret);
        res.status = write_err(ret);
        res.u.fail.file_wcc.before.attr_follow = FALSE;
        res.u.fail.file_wcc.after.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_writeret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, write);
        return;
}

static int  __nfs3_create_exclusive(const create_args *args, fileid_t *_fileid)
{
        int ret, empty;
        const fileid_t *parent;
        const char *name;
        fileid_t fileid;
        struct stat stbuf;
        time_t mtime, atime;
        const sattr *attr;

        parent = (fileid_t *)args->where.dir.val;
        name = args->where.name;

        DINFO("file "CHKID_FORMAT"/%s exist, check exclusive\n",
              CHKID_ARG(parent), name);

        ret = stor_lookup("default", parent, name, &fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = stor_getattr("default", &fileid, &stbuf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (S_ISDIR(stbuf.st_mode)) {
                ret = EISDIR;
                GOTO(err_ret, ret);
        }

        attr = &args->how.attr;
        if (attr->mtime.set_it) {
                mtime = attr->mtime.time.seconds;
        } else
                mtime = 0;

        if (attr->atime.set_it) {
                atime = attr->atime.time.seconds;
        } else
                atime = 0;

        if (args->how.mode == EXCLUSIVE) {
                if ((stbuf.st_mode & 01777) == EXCLUSIVE
                    &&stbuf.st_mtime == mtime && stbuf.st_atime == atime) {
                        DINFO("exclusive "CHKID_FORMAT"/%s resume\n",
                              CHKID_ARG(parent), name);
                } else {
                        /*
                          DWARN("EXCLUSIVE %s mtime %u:%u atime %u:%u mode %u\n",
                          (LLU)fileid.id, fileid.version,
                          stbuf.st_mtime, mtime, stbuf.st_atime, atime,
                          stbuf.st_mode & 01777);
                        */
                        ret = EEXIST;
                        GOTO(err_ret, ret);
                }
        } else {
                ret = stor_isempty(&fileid, &empty);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (!empty) {
                        ret = EEXIST;
                        GOTO(err_ret, ret);
                }

                DINFO("exclusive "CHKID_FORMAT"/%s resume\n",
                      CHKID_ARG(parent), name);
        }

        *_fileid = fileid;

        return 0;
err_ret:
        return ret;
}

static void __nfs_create_exclusive_trans(create_args *args)
{
	//uint64_t *tmp;
	uint32_t *tmp1, *tmp2;
        //uint64_t verf;
        sattr *attr;

        attr = &args->how.attr;

        //tmp = (uint64_t *)args->how.verf;
        //verf = *tmp;

        tmp1 = (uint32_t *)args->how.verf;


        attr->atime.time.seconds = *tmp1;
        attr->atime.set_it = 1;

        tmp2 = (uint32_t *)&args->how.verf[4];
        attr->mtime.time.seconds = *tmp2;
        attr->mtime.set_it = 1;

        attr->uid.uid = 0;
        attr->uid.set_it = 1;

        attr->gid.gid = 0;
        attr->gid.set_it = 1;

        attr->mode.mode = EXCLUSIVE;
        attr->mode.set_it = 1;

        attr->size.size = 0;
        attr->size.set_it = 1;
#if 0
        DBUG("EXCLUSIVE create parent %s name %s verf %llu"
             " atime %u, mtime %u\n",
             CHKID_ARG(parent),
             args->where.name, (LLU)verf, atime, mtime);
#endif
}


void nfs3_create_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret, retry = 0;
        mode_t flags = O_RDWR | O_CREAT | O_EXCL;
        //uint32_t mode, mtime, atime, uid, gid;
        postop_fh *pfh;
        create_args *args;
        fileid_t *parent;
        create_ret res;
        sattr *attr;
        fileid_t fileid;
        setattr_t setattr;

        args = &nfs_arg->create_arg;
        parent = (fileid_t *)args->where.dir.val;

        DINFO(" parent "CHKID_FORMAT" name %s\n", CHKID_ARG(parent), args->where.name);

        res.status = NFS3_OK;

        if (args->how.mode != UNCHECKED)
                flags |= O_EXCL;

        if (args->how.mode != EXCLUSIVE) {
                /* NULL */
        }


        /* XXX need set this value according to file's access
         * mode. but I cann't get it here. maybe require SETATTR
         * proc. maybe ...  -gj
         */

        get_preopattr1(parent, &res.u.ok.dir_wcc.before);

        //DBUG("mode %u\n", mode);

        if (args->how.mode == EXCLUSIVE) {
                __nfs_create_exclusive_trans(args);
        }

        attr = &args->how.attr;

        __nfs_attr_trans(attr, &setattr, 1);
retry:
        ret = stor_mkvol(parent, args->where.name, &setattr, &fileid);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        ret = __nfs3_create_exclusive(args, &fileid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else if (ret == ENOSPC || ret == EAGAIN) {
                        if (retry > MAX_RETRY) {
                                GOTO(err_ret, ret);
                        }

                        retry++;
                        schedule_sleep("nfs",5 * 1000 * 1000);
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }


        DBUG("fileid "CHKID_FORMAT"\n", CHKID_ARG(&fileid));

        res.status = NFS3_OK;

        pfh = &res.u.ok.obj;
        pfh->handle.val = (void *)&fileid;
        pfh->handle.len = sizeof(fileid_t);
        pfh->handle_follows = TRUE;

        YASSERT(fileid.id != 0);

        get_postopattr1(&fileid, &res.u.ok.obj_attr);
        get_postopattr1(parent, &res.u.ok.dir_wcc.after);

        DBUG("status %d fhlen %u\n", res.status, pfh->handle.len);

        FREE_ARGS(_buf, create);
        sunrpc_reply_send(sockid, xid, (void *)xdr_createret, &res, ACCEPT_STATE_OK);

        return;
err_ret:
        DERROR("nfs create "CHKID_FORMAT"/%s fail ret:%d\n", CHKID_ARG(parent), args->where.name, ret);
        res.status = create_err(ret);
        res.u.fail.dir_wcc.after.attr_follow = FALSE;
        res.u.fail.dir_wcc.before.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_createret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, create);
        return;
}

void nfs3_mkdir_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret;
        preop_attr pre;
        post_op_attr post;
        mode_t mode;
        postop_fh *pfh;
        mkdir_args *args;
        mkdir_ret res;
        fileid_t *parent;
        setattr_t setattr;
        fileid_t fileid;

        args = &nfs_arg->mkdir_arg;
        parent = (fileid_t *)args->where.dir.val;

        DINFO("parent "CHKID_FORMAT" name %s\n", CHKID_ARG(parent), args->where.name);

        get_preopattr1(parent, &pre);

        (void) sattr_tomode(&mode, &args->attr);

        __nfs_attr_trans(&args->attr, &setattr, 1);
        ret = stor_mkpool(parent, args->where.name, &setattr, NULL);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        ret = EEXIST;
                        GOTO(err_ret, ret);

#if 0
                        ret = stor_isempty(parent, args->where.name,
                                             &empty, __S_IFDIR);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        if (empty) {
                                ret = stor_lookup(parent,
                                                    args->where.name,
                                                    &fileid);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
                        } else {
                                ret = EEXIST;
                                GOTO(err_ret, ret);
                        }
#endif

                } else
                        GOTO(err_ret, ret);
        }

        ret = stor_lookup("default", parent, args->where.name, &fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        __nfs_attr_trans(&args->attr, &setattr, 0);
        ret = vnode_setattr(&fileid, &setattr);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        pfh = &res.u.ok.obj;
        pfh->handle.val = (void *)&fileid;
        pfh->handle.len = sizeof(fileid_t);
        pfh->handle_follows = TRUE;

        res.status = NFS3_OK;

        get_postopattr1(&fileid, &res.u.ok.obj_attr);
        get_postopattr1(parent, &post);

        /* overlaps with resfail */
        res.u.ok.dir_wcc.before = pre;
        res.u.ok.dir_wcc.after = post;

        FREE_ARGS(_buf, mkdir);
        sunrpc_reply_send(sockid, xid, (void *)xdr_mkdirret, &res, ACCEPT_STATE_OK);

        return;
err_ret:
        DINFO("nfs mkdir "CHKID_FORMAT"/%s fail ret:%d\n", CHKID_ARG(parent), args->where.name, ret);
        res.status = mkdir_err(ret);
        res.u.fail.dir_wcc.before.attr_follow = FALSE;
        res.u.fail.dir_wcc.after.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_mkdirret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, mkdir);
        return;
}

inline static int __nfs3_remove(const fileid_t *parent, const char *name)
{
        int ret;
        fileid_t rootid, fid;
        uuid_t uuid;
        char buf[MAX_BUF_LEN], buf1[MAX_BUF_LEN];

retry:
        ret = stor_lookup1("default", "/.recycle", &fid);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        ret = stor_lookup1("default", "/", &rootid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = stor_mkpool(&rootid, ".recycle", NULL, &fid);
                        if (unlikely(ret)) {
                                if (ret == EEXIST) {
                                        goto retry;
                                } else
                                        GOTO(err_ret, ret);
                        } else
                                GOTO(err_ret, ret);
                }
        }

        uuid_generate(uuid);
        uuid_unparse(uuid, buf);
        snprintf(buf1, MAX_BUF_LEN, "%s_%s", name, buf);

        DINFO("rename %s to %s\n", name, buf1);

        ret = stor_rename("default", parent, name, &fid, buf1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

void nfs3_remove_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret;
        remove_ret res;
        remove_args *args;
        fileid_t *parent, fileid;

        args = &nfs_arg->remove_arg;
        parent = (fileid_t *)args->obj.dir.val;

        DINFO("parent "CHKID_FORMAT" name %s\n",
              CHKID_ARG(parent), args->obj.name);

        ret = stor_lookup("default", parent, args->obj.name, &fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        get_preopattr1(&fileid, &res.dir_wcc.before);

#if 1
        ret = stor_rmvol(parent, args->obj.name, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#else
        ret = __nfs3_remove(parent, args->obj.name);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        res.status = NFS3_OK;

        get_postopattr1(parent, &res.dir_wcc.after);

        FREE_ARGS(_buf, remove);
        sunrpc_reply_send(sockid, xid, (void *)xdr_removeret, &res, ACCEPT_STATE_OK);

        return;
err_ret:
        DERROR("nfs remove "CHKID_FORMAT"/%s fail ret:%d\n",
              CHKID_ARG(parent), args->obj.name, ret);
        res.status = remove_err(ret);
        res.dir_wcc.after.attr_follow = FALSE;
        res.dir_wcc.before.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_removeret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, remove);
        return;
}

void nfs3_rmdir_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret;
        rmdir_ret res;
        rmdir_args *args;
        fileid_t *parent, fileid;

        args = &nfs_arg->rmdir_arg;
        parent = (fileid_t *)args->obj.dir.val;

        DINFO("parent "CHKID_FORMAT" name %s\n",
              CHKID_ARG(parent), args->obj.name);

        ret = stor_lookup("default", parent, args->obj.name, &fileid);
        if (unlikely(ret)) {
                DWARN("parent "CHKID_FORMAT" name %s\n", CHKID_ARG(parent), args->obj.name);
                GOTO(err_ret, ret);
        }

        get_preopattr1(&fileid, &res.dir_wcc.before);

        ret = stor_rmpool("default", parent, args->obj.name);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        res.status = NFS3_OK;

        /* overlaps with resfail */
        get_postopattr1(parent, &res.dir_wcc.after);

        FREE_ARGS(_buf, rmdir);
        sunrpc_reply_send(sockid, xid, (void *)xdr_rmdirret, &res, ACCEPT_STATE_OK);

        return;
err_ret:
        DERROR("nfs rmdir "CHKID_FORMAT"/%s fail ret:%d\n",
              CHKID_ARG(parent), args->obj.name, ret);
        res.status = rmdir_err(ret);
        res.dir_wcc.before.attr_follow = FALSE;
        res.dir_wcc.after.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_rmdirret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, rmdir);
        return;
}

void nfs3_readdir_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret;
        readdir_ret res;
        readdir_args *args;
        fileid_t *fileid;
        entry *entrys;
        char *patharray;
        void *ptr;

        DINFO("readdir\n");

        args = &nfs_arg->readdir_arg;
        fileid = (fileid_t *)args->dir.val;

        ret = ymalloc(&ptr, (sizeof(entry) + MAX_PATH_LEN)
                      * MAX_ENTRIES);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        entrys = ptr;
        ptr += sizeof(entry) * MAX_ENTRIES;
        patharray = ptr;

        ret = read_dir(fileid, args->cookie, args->cookieverf,
                       args->count, &res, entrys, patharray);
        if (unlikely(ret))
                GOTO(err_free, ret);

        res.status = NFS3_OK;

        get_postopattr1(fileid, &res.u.ok.dir_attr);

        FREE_ARGS(_buf, readdir);
        sunrpc_reply_send(sockid, xid, (void *)xdr_readdirret, &res, ACCEPT_STATE_OK);

        yfree((void **)&entrys);

        return;
err_free:
        yfree((void **)&entrys);
err_ret:
        DERROR("nfs readdir "CHKID_FORMAT" fail ret:%d\n", CHKID_ARG(fileid), ret);
        res.status = readdir_err(ret);
        res.u.fail.dir_attr.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_readdirret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, readdir);
        return;
}

void nfs3_readdirplus_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret;
        readdirplus_ret res;
        readdirplus_args *args;
        fileid_t *fileid;
        entryplus *entrys;
        char *patharray;
        fileid_t *fharray;
        void *ptr;

        args = &nfs_arg->readdirplus_arg;
        fileid = (fileid_t *)args->dir.val;

        DINFO("redirplus\n");

        ret = ymalloc(&ptr, (sizeof(entryplus) + sizeof(fileid_t) + MAX_NAME_LEN )
                      * MAX_DIRPLUS_ENTRIES);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        entrys = ptr;
        ptr += sizeof(entryplus) * MAX_DIRPLUS_ENTRIES;
        fharray = ptr;
        ptr += sizeof(fileid_t) * MAX_DIRPLUS_ENTRIES;
        patharray = ptr;

        ret = readdirplus(fileid, args->cookie, args->cookieverf,
                          args->dircount, &res, entrys, patharray,
                          fharray);
        if (unlikely(ret))
                GOTO(err_free, ret);

        res.status = NFS3_OK;

        get_postopattr1(fileid, &res.u.ok.dir_attr);

        FREE_ARGS(_buf, readdirplus);

        sunrpc_reply_send(sockid, xid, (void *)xdr_readdirplusret, &res, ACCEPT_STATE_OK);

        yfree((void **)&entrys);

        return;

err_free:
        yfree((void **)&entrys);
err_ret:
        DERROR("nfs readdirplus "CHKID_FORMAT" fail ret:%d\n", CHKID_ARG(fileid), ret);
        res.status = readdir_err(ret);
        res.u.fail.dir_attr.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_readdirplusret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, readdirplus);
        return;
}

void nfs3_fsstat_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret;
        fsstat_ret res;
        struct statvfs svbuf;
        fsstat_args *args;
        fileid_t *fileid;

        args = &nfs_arg->fsstat_arg;
        fileid = (fileid_t *)args->stor_root.val;

        /* overlaps with resfail */
        get_postopattr1(fileid, &res.u.ok.attr);

        ret = stor_statvfs("default", fileid, &svbuf);
        if (unlikely(ret)) {
                DWARN("fileid "CHKID_FORMAT"\n",CHKID_ARG(fileid));
                GOTO(err_ret, ret);
        } else {
                res.status = NFS3_OK;
                res.u.ok.tbytes = (uint64_t) svbuf.f_blocks * svbuf.f_bsize;
                res.u.ok.fbytes = (uint64_t) svbuf.f_bfree * svbuf.f_bsize;
                res.u.ok.abytes = (uint64_t) svbuf.f_bavail * svbuf.f_bsize;
                res.u.ok.tfiles = (uint64_t) svbuf.f_files;
                res.u.ok.ffiles = (uint64_t) svbuf.f_ffree;
                res.u.ok.afiles = (uint64_t) svbuf.f_ffree;
                res.u.ok.invarsec = 0;
        }

        FREE_ARGS(_buf, fsstat);

        sunrpc_reply_send(sockid, xid, (void *)xdr_fsstatret, &res, ACCEPT_STATE_OK);

        return;
err_ret:
        DWARN("nfs fsstat fileid "CHKID_FORMAT" fail ret:%d\n",CHKID_ARG(fileid), ret);
        res.status = NFS3_EIO;
        res.u.fail.attr.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_fsstatret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, fsstat);
        return;
}

void nfs3_fsinfo_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        fsinfo_ret res;
        fsinfo_args *args;
        fileid_t *fileid;

        args = &nfs_arg->fsinfo_arg;
        fileid = (fileid_t *)args->stor_root.val;

        DBUG("fh len %u\n", args->stor_root.len);

        get_postopattr1(fileid, &res.u.ok.obj_attr);

        res.status = NFS3_OK;

        res.u.ok.rtmax = nfsconf.rsize;
        res.u.ok.rtpref = nfsconf.rsize;
        res.u.ok.rtmult = 4096;
        res.u.ok.wtmax = nfsconf.wsize;
        res.u.ok.wtpref = nfsconf.wsize;
        res.u.ok.wtmult = 4096;
        res.u.ok.dtpref = 4096;
        res.u.ok.maxfilesize = ((LLU)1024 * 1024 * 1024 * 1024 * 10);
        res.u.ok.time_delta.seconds = 1;
        res.u.ok.time_delta.nseconds = 0;
        //res.u.ok.properties = NFSINFO_HOMOGENEOUS | NFSINFO_SYMLINK | NFSINFO_LINK;
        res.u.ok.properties = NFSINFO_HOMOGENEOUS;

        FREE_ARGS(_buf, fsinfo);
        sunrpc_reply_send(sockid, xid, (void *)xdr_fsinforet, &res, ACCEPT_STATE_OK);

        return;
}

void nfs3_pathconf_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        pathconf3_res res;
        pathconf3_args * args;
        fileid_t *fileid;

        args = &nfs_arg->pathconf_arg;
        fileid = (fileid_t *)args->object.val;

        get_postopattr1(fileid, &res.pathconf3_res_u.resok.obj_attributes);

        res.status = NFS3_OK;
        res.pathconf3_res_u.resok.linkmax = 0xFFFFFFFF;
        res.pathconf3_res_u.resok.name_max = NFS_PATHLEN_MAX;
        res.pathconf3_res_u.resok.no_trunc = TRUE;
        res.pathconf3_res_u.resok.chown_restricted = FALSE;
        res.pathconf3_res_u.resok.case_insensitive = FALSE;
        res.pathconf3_res_u.resok.case_preserving = TRUE;

        FREE_ARGS(_buf, pathconf3_);
        sunrpc_reply_send(sockid, xid, (void *)xdr_pathconf3_ret, &res, ACCEPT_STATE_OK);

        return;
}

void nfs3_commit_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        commit_ret res;
        commit_args *args;
        fileid_t *fileid;

        args = &nfs_arg->commit_arg;
        fileid = (fileid_t *)args->file.val;

        res.status = NFS3_OK;

        get_preopattr1(fileid, &res.u.ok.file_wcc.before);
        get_postopattr1(fileid, &res.u.ok.file_wcc.after);

        FREE_ARGS(_buf, commit);
        sunrpc_reply_send(sockid, xid, (void *)xdr_commitret, &res, ACCEPT_STATE_OK);

        return;
}

/**
 * no need to verify target path
 */
void nfs3_symlink_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret;
        symlink_args *args;
        symlink_res res;
        fileid_t *parent, fileid;
        postop_fh *pfh;
        char buf[MAX_BUF_LEN];
        uint32_t mode;

        args = &nfs_arg->symlink_arg;

        ret = ENOSYS;
        GOTO(err_ret, ret);

        // link
        parent = (fileid_t *)args->where.dir.val;

        DBUG("link %s data %s\n", args->where.name, args->symlink.symlink_data);

        (void) sattr_tomode(&mode, &args->symlink.symlink_attributes);

        ret = ENOSYS;
        GOTO(err_ret, ret);
        //ret = stor_symlink(args->symlink.symlink_data, parent, args->where.name, mode);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        ret = stor_lookup("default", parent, args->where.name, &fileid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        //buflen = MAX_BUF_LEN;

#if 0
                        ret = stor_readlink(&fileid, buf, &buflen);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
#endif

                        if (strcmp(buf, args->symlink.symlink_data) != 0)
                                GOTO(err_ret, ret);
                } else {
                        DWARN("link %s data %s\n", args->where.name, args->symlink.symlink_data);
                        GOTO(err_ret, ret);
                }
        }

        ret = stor_lookup("default", parent, args->where.name, &fileid);
        if (unlikely(ret)) {
                DWARN("lookup %s form "CHKID_FORMAT"\n", args->where.name,
                      CHKID_ARG(parent));
                GOTO(err_ret, ret);
        }

        res.status = NFS3_OK;

        pfh = &res.u.ok.obj;
        pfh->handle.val = (void *)&fileid;
        pfh->handle.len = sizeof(fileid_t);
        pfh->handle_follows = TRUE;

        res.u.ok.dir_wcc.before.attr_follow = FALSE;
        get_postopattr1(&fileid, &res.u.ok.obj_attributes);
        get_postopattr1(parent, &res.u.ok.dir_wcc.after);

        FREE_ARGS(_buf, symlink);
        sunrpc_reply_send(sockid, xid, (void *)xdr_symlinkret, &res, ACCEPT_STATE_OK);

        return;
err_ret:
        DERROR("nfs symlink %s data %s fail ret:%d\n", args->where.name, args->symlink.symlink_data, ret);
        res.status = symlink_err(ret);
        res.u.fail.dir_wcc.after.attr_follow = FALSE;
        res.u.fail.dir_wcc.before.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_symlinkret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, symlink);
        return;
}

void nfs3_link_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret;
        LINK3args *args;
        LINK3res res;
        fileid_t *parent, *fileid;

        args = &nfs_arg->link3arg;

        fileid = (fileid_t *)args->file.val;

        parent = (fileid_t *)args->link.dir.val;

        get_preopattr1(parent, &res.u.ok.linkdir_wcc.before);

        DBUG("parent "CHKID_FORMAT" name %s\n", CHKID_ARG(parent), args->link.name);

        ret = ENOSYS;
        GOTO(err_ret, ret);
        //ret = stor_link2node(fileid, parent, args->link.name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        res.status = NFS3_OK;

        get_postopattr1(parent, &res.u.ok.linkdir_wcc.after);
        get_postopattr1(fileid, &res.u.ok.file_attributes);

        FREE_ARGS(_buf, LINK3);

        sunrpc_reply_send(sockid, xid, (void *)xdr_linkret, &res, ACCEPT_STATE_OK);

        return;
err_ret:
        DERROR("nfs link "CHKID_FORMAT"/%s fail ret:%d\n", CHKID_ARG(parent), args->link.name, ret);
        res.status = NFS3_EIO;
        res.u.fail.linkdir_wcc.after.attr_follow = FALSE;
        res.u.fail.linkdir_wcc.before.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_linkret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, LINK3);
        return;
}

void nfs3_readlink_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret;
        readlink_args *args;
        readlink_res res;
        fileid_t *fileid;
        char buf[MAX_BUF_LEN];
//        uint32_t buflen = MAX_BUF_LEN;

        args = &nfs_arg->readlink_arg;

        fileid = (fileid_t *)args->symlink.val;

        ret = ENOSYS;
        GOTO(err_ret, ret);
        //ret = stor_readlink(fileid, buf, &buflen);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        /* just a pointer */
        res.status = NFS3_OK;
        res.u.ok.data = buf;

        get_postopattr1(fileid, &res.u.ok.symlink_attributes);
        FREE_ARGS(_buf, readlink);
        sunrpc_reply_send(sockid, xid, (void *)xdr_readlinkret, &res, ACCEPT_STATE_OK);

        return;
err_ret:
        DERROR("nfs readlink "CHKID_FORMAT" fail ret:%d\n", CHKID_ARG(fileid), ret);
        res.status = NFS3_EIO;
        res.u.fail.symlink_attributes.attr_follow = FALSE;
        sunrpc_reply_send(sockid, xid, (void *)xdr_readlinkret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, readlink);
        return;
}

void acl_null_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        (void) nfs_arg;
        (void) _buf;
        sunrpc_reply_send(sockid, xid, NULL, NULL, ACCEPT_STATE_ERROR);
}

void mount_null_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        (void) nfs_arg;
        (void) _buf;
        sunrpc_reply_send(sockid, xid, NULL, NULL, ACCEPT_STATE_OK);
}

#define xdr_mountargs xdr_dirpath
#define xdr_umountargs xdr_dirpath

static int __mount_mnt(const char *name, poolid_t *poolid)
{
        int ret;
        poolid_t parent;
        char path[MAX_NAME_LEN];

retry:
        ret = stor_lookup1("default", "/nfs", &parent);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DINFO("create root\n");
                        
                        ret = stor_lookup1("default", "/", &parent);
                        if (ret)
                                GOTO(err_ret, ret);

                        ret = stor_mkpool(&parent, "nfs", NULL, NULL);
                        if (ret) {
                                if (ret == EEXIST) {
                                } else
                                        GOTO(err_ret, ret);
                        }

                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        snprintf(path, MAX_NAME_LEN, "/nfs%s", name);
        ret =stor_lookup1("default", path, poolid);
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

void mount_mnt_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret = 0, retry = 0;
        static mount_ret res;
        fileid_t fileid;
        static int auth = AUTH_UNIX;
        char **args;
        uint32_t mode;
        struct stat stbuf;

        _memset(&res, 0x0, sizeof(mount_ret));

        args = &nfs_arg->mnt_arg;

        DINFO("mount %s\n", *args);;

retry:
        ret =__mount_mnt(*args, &fileid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 100, (1000 * 1000));
                } else {
                        res.fhs_status = MNT_EACCES;
                        ret = EPERM;
                        GOTO(err_ret, ret);;
                }
        }

        ret = stor_getattr("default", &fileid, &stbuf);
        if (unlikely(ret)) {
                res.fhs_status = MNT_EACCES;
                ret = EPERM;
                GOTO(err_ret, ret);;
        }

        mode = stbuf.st_mode;
        if (!S_ISDIR(mode)) {
                res.fhs_status = MNT_EACCES;
                DWARN("path %s, mode %o\n", *args, mode);
                ret = EPERM;
                GOTO(err_ret, ret);;
        }

        DINFO("mount %s ok, res %p\n", *args, &res);

        res.fhs_status = MNT_OK;

        res.u.mountinfo.fhandle.len = sizeof(fileid_t);
        res.u.mountinfo.fhandle.val = (char *)&fileid;
        //res.u.mountinfo.handle_follows = TRUE;
        res.u.mountinfo.auth_flavors.len = 0;
        res.u.mountinfo.auth_flavors.val = &auth;

        sunrpc_reply_send(sockid, xid, (void *)xdr_mountret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, mount);

        return;
err_ret:
        res.fhs_status = MNT_EPERM;
        sunrpc_reply_send(sockid, xid, (void *)xdr_mountret, &res, ACCEPT_STATE_OK);
        FREE_ARGS(_buf, mount);
        return;
}

void mount_umnt_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        /* RPC times out if we use a NULL pointer */
        char **args;

        args = &nfs_arg->umnt_arg;

        DBUG("umount\n");

        //remove_mount(*args, get_remote((void *)job->net));

        FREE_ARGS(_buf, umount);

        sunrpc_reply_send(sockid, xid, NULL, NULL, ACCEPT_STATE_OK);
}

static int __nfs3_export()
{
        int ret, delen, done = 0;
        uint64_t offset = 0, offset2 = 0;
        struct dirent *de;
        void *de0;
        fileid_t fid;
        char uuid[MAX_NAME_LEN] = {}, path[MAX_PATH_LEN];

        ret = stor_lookup1("default", NFS_ROOT, &fid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        get_uuid(uuid);
        ret = stor_listpool_open(&fid, uuid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        de0 = NULL;
        while (done == 0) {
                ret = stor_listpool(&fid, uuid, offset, &de0, &delen);
                if (unlikely(ret))
                        GOTO(err_close, ret);

                if (delen == 0)
                        break;

                offset2 = 0;
                dir_for_each(de0, delen, de, offset2) {
                        if (strlen(de->d_name) == 0) {
                                done = 1;
                                goto out;
                        } else if (delen - offset2 < sizeof(*de) + MAX_NAME_LEN) {
                                break;
                        }

                        offset += de->d_reclen;

                        snprintf(path, MAX_PATH_LEN, "/%s", de->d_name);
                        get_nfs_export(path, "*", "rw,sync,no_root_squash");
                }

                yfree((void **)&de0);
        }

out:
        if (de0)
                yfree((void **)&de0);

        stor_listpool_close(&fid, uuid);
        return 0;
err_close:
        stor_listpool_close(&fid, uuid);
err_ret:
        return ret;
}

void mount_export_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret, i, len;
        static struct exportnode *export_list = 0, *resnode;
        void *ptr;
        struct nfsconf_export_t *export;

        (void) nfs_arg;
        (void) _buf;

        ret = __nfs3_export();
        if (unlikely(ret)) {
                DERROR("nfs3 export error:%d\n", ret);
                goto out;
        }

        if (nfsconf.export_size == 0) {
                goto out;
        }

        if (export_list == 0) {
                ret = ymalloc(&ptr, sizeof(exportnode) * nfsconf.export_size);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                export_list = ptr;

                for (i = 0; i < nfsconf.export_size; i++) {
                        export = &nfsconf.nfs_export[i];
                        resnode = &export_list[i];

                        len = sizeof(export->path);
                        YASSERT(len < MAX_PATH_LEN);
                        ret = ymalloc(&ptr, len + 1);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        _strncpy(ptr, export->path, len);

                        resnode->ex_dir = ptr;

                        ret = ymalloc(&ptr, sizeof(groupnode));
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        resnode->ex_groups = ptr;

                        len = sizeof(export->ip);
                        YASSERT(len < MAX_PATH_LEN);
                        ret = ymalloc(&ptr, len);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        _strncpy(ptr, export->ip, len);

                        resnode->ex_groups->gr_name = ptr;
                        resnode->ex_groups->gr_next = NULL;

                        if (i < nfsconf.export_size - 1)
                                resnode->ex_next = &export_list[i + 1];
                        else
                                resnode->ex_next = NULL;
                }
        }

out:
        sunrpc_reply_send(sockid, xid, (xdr_ret_t)xdr_exports, &export_list, ACCEPT_STATE_OK);
}

void mount_dump_svc(const sockid_t *sockid, uint32_t xid, nfs_arg_t *nfs_arg, buffer_t *_buf)
{
        int ret, i, len;
        static struct mountbody *dump_list = 0, *resnode;
        void *ptr;
        struct nfsconf_export_t *export;

        (void) nfs_arg;
        (void) _buf;

        UNIMPLEMENTED(__DUMP__);

        if (dump_list == 0) {
                ret = ymalloc(&ptr, sizeof(struct mountbody) * nfsconf.export_size);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                dump_list = ptr;

                for (i = 0; i < nfsconf.export_size; i++) {
                        export = &nfsconf.nfs_export[i];
                        resnode = &dump_list[i];

                        len = sizeof(export->path);
                        YASSERT(len < MAX_PATH_LEN);
                        ret = ymalloc(&ptr, len + 1);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        _strncpy(ptr, export->path, len);

                        resnode->ml_directory = ptr;

                        len = sizeof(export->ip);
                        YASSERT(len < MAX_PATH_LEN);
                        ret = ymalloc(&ptr, len);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        _strncpy(ptr, export->ip, len);

                        resnode->ml_hostname = ptr;

                        if (i < nfsconf.export_size - 1)
                                resnode->ml_next = &dump_list[i + 1];
                        else
                                resnode->ml_next = NULL;
                }
        }

        sunrpc_reply_send(sockid, xid, (xdr_ret_t)xdr_exports, &dump_list, ACCEPT_STATE_OK);
}
